#[allow(dead_code)]

use std::sync::{Arc};
use std::collections::HashMap;
use tokio::runtime::Runtime;
use crate::util::{AsyncSendData, AsyncRecvData, channel_new, SyncRecv, SyncSender, async_data_channel_new};
use async_trait::async_trait;
use crate::data_frame::DataInstance;
use crate::conf_init::{EVENT_FLOW_NODE_CONF_INSTANCE, MapResult, MapErr, normal_conf};
use crate::source::file::{SourceFileConf, SourceFile};
use std::time::Duration;
use crate::indicators::{indicators_key_ref, Op, IndicatorsMode};
#[cfg(feature="source_zmq")]
use crate::source::zeromq::{Zmq,ZmqConf};
use std::fmt::Debug;
use tokio::sync::RwLock as TokioRwLock;
use tokio::sync::RwLockReadGuard as TokioRwLockReadGuard;
use tokio::sync::RwLockWriteGuard as TokioRwLockWriteGuard;
use once_cell::sync::OnceCell;
use linked_hash_map::LinkedHashMap;
use crate::data_frame::single_data::SingleData;

mod file;
#[cfg(feature="source_zmq")]
mod zeromq;

pub type SourceResult = Result<Vec<SingleData>,()>;

#[async_trait]
pub trait Source:Send+Sync+Debug{
    async fn init(&mut self)->bool;
    async fn recv(&mut self, runtime: &Runtime, timeout: Duration, burst: usize, concurrent: usize)->SourceResult;
}

#[derive(Deserialize,Debug)]
#[allow(non_camel_case_types)]
pub enum SourceWay{
    file(SourceFileConf),
    #[cfg(feature="source_zmq")]
    zmq(ZmqConf),
}
#[derive(Deserialize,Clone,Debug)]
#[allow(non_camel_case_types)]
pub enum SourceFormat{
    json,
    arrow,
}
#[derive(Deserialize,Debug)]
pub struct SourceConf{
    pub name: String,
    enable: bool,
    threads: usize,
    concurrent: usize,
    burst: usize,
    timeout: u64,//mills
    way: SourceWay,
    format: SourceFormat,
}

pub struct SourceListConf{
    source: Vec<SourceConf>,
}
#[derive(Debug)]
pub struct SourceInstance{
    name: String,
    runtime: Arc<Runtime>,
    next: LinkedHashMap<String,AsyncSendData>,
    ctl_recv: SyncRecv<SourceMsg>,
    pub source: Box<dyn Source>,
    burst: usize,
    concurrent: usize,
    timeout: Duration,
}
#[derive(Debug)]
pub struct SourceCtl{
    pub name: String,
    pub enable: bool,
    runtime: Arc<Runtime>,
    ctl_send: SyncSender<SourceMsg>,
    source: Option<SourceInstance>,
}
#[derive(Debug)]
pub enum SourceMsg{
    AddNext(String, AsyncSendData)
}

static SOURCE_LIST: OnceCell<TokioRwLock<HashMap<String, SourceCtl>>>= OnceCell::new();

pub async fn source_list_read<'a>()->TokioRwLockReadGuard<'a, HashMap<String, SourceCtl>>{
    let tmp = SOURCE_LIST.get_or_init(|| {
        match source_init_from_conf(&EVENT_FLOW_NODE_CONF_INSTANCE.source){
            MapResult::Ok(t) => {t},
            MapResult::Err(e) => {
                error!("source_init_from_conf failed, [{:?}]", e);
                std::process::exit(-1);
            },
        }
    });

    tmp.read().await
}

pub async fn source_list_write<'a>()->TokioRwLockWriteGuard<'a, HashMap<String, SourceCtl>>{
    let tmp = SOURCE_LIST.get_or_init(|| {
        match source_init_from_conf(&EVENT_FLOW_NODE_CONF_INSTANCE.source){
            MapResult::Ok(t) => {t},
            MapResult::Err(e) => {
                error!("source_init_from_conf failed, [{:?}]", e);
                std::process::exit(-1);
            },
        }
    });

    tmp.write().await
}

fn source_init(source_conf: &SourceConf)->MapResult<Box<dyn Source>>{
    match &source_conf.way{
        SourceWay::file(conf) => {return SourceFile::new(&source_conf.name, &source_conf.format, &conf);},
        #[cfg(feature="source_zmq")]
        SourceWay::zmq(conf) => {return Zmq::new(&source_conf.name, &source_conf.format, &conf);},
    }
}

pub fn source_init_from_conf(conf: &Option<Vec<SourceConf>>)->MapResult<TokioRwLock<HashMap<String,SourceCtl>>>{

    info!("source_init_from_conf");

    let mut source_ctl_list = HashMap::new();

    if let Some(conf) = conf {
        for source_conf in conf.iter() {
            let tmp_source_ctl = source_init_from_conf_one(source_conf)?;
            source_ctl_list.insert(source_conf.name.to_string(), tmp_source_ctl);
        }
    }else{
        info!("source_init_from_conf no source");
    }

    info!("source_init_from_conf end");
    return MapResult::Ok(TokioRwLock::new(source_ctl_list));
}
pub async fn source_register_online(source_conf: &SourceConf)->MapResult<()>{
    {
        let t =  source_list_read().await;
        if let Some(_) = t.get(&source_conf.name){
            return MapResult::Err(MapErr::new(format!("Source name is regestered")));
        }
    }

    let mut source_ctl = source_init_from_conf_one(source_conf)?;

    if let Some(source) = source_ctl.source.take(){
        source_run_one(&source_ctl.runtime, source);

        let mut source_ctl_list = source_list_write().await;

        source_ctl_list.insert(source_ctl.name.clone(), source_ctl);
        return MapResult::Ok(());
    }else{
        error!("source_register_online take source is None");
        std::process::exit(-1);
    }
}
fn source_init_from_conf_one(source_conf: &SourceConf)->MapResult<SourceCtl>{

    info!("source [{}] init from conf", source_conf.name);

    let source_runtime = if let Ok(t) = tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .thread_name(format!("source_{}", source_conf.name))
        .worker_threads(source_conf.threads)
        .max_blocking_threads(1)
        .build() {
        Arc::new(t)
    } else {
        error!("source [{}] runtime init failed", source_conf.name);

        let err_desc = format!("source [{}] runtime init failed", source_conf.name);
        return MapResult::Err(MapErr::new(err_desc));
    };

    let (ctl_send, ctl_recv) = channel_new();

    let source = if source_conf.enable{
        let source = SourceInstance {
            name: source_conf.name.clone(),
            runtime: Arc::clone(&source_runtime),
            next: LinkedHashMap::new(),
            ctl_recv: ctl_recv,
            source: source_init(&source_conf)?,
            burst: source_conf.burst,
            concurrent: source_conf.concurrent,
            timeout: Duration::from_millis(source_conf.timeout),
        };
        Some(source)
    }else{
        None
    };

    let tmp_source_ctl = SourceCtl {
        name: source_conf.name.clone(),
        enable: source_conf.enable,
        runtime: source_runtime,
        ctl_send: ctl_send,
        source: source,
    };

    info!("source_init_from_conf_one [{}] end", source_conf.name);
    return MapResult::Ok(tmp_source_ctl);
}

pub async fn source_add_next_online(source_name: &str, next_name: &str)->MapResult<AsyncRecvData>{
    let mut source_ctl_list = source_list_write().await;

    if let Some(tmp) = source_ctl_list.get_mut(source_name){
        let (s,r) = async_data_channel_new(format!("{}-{}", source_name, next_name));
        let msg = SourceMsg::AddNext(next_name.to_string(), s);
        tmp.ctl_send.send(msg);
        return MapResult::Ok(r);
    }else{
        let err_desc = format!("source [{}] not exist", source_name);
        return MapResult::Err(MapErr::new(err_desc));
    }
}

pub async fn source_add_next(source_name: &str, next_name: &str)->MapResult<AsyncRecvData>{
    info!("source [{}] add next [{}]", source_name, next_name);

    let mut source_ctl_list = source_list_write().await;

    if let Some(tmp) = source_ctl_list.get_mut(source_name){

        if let Some(ref mut next) = tmp.source{
            let (s,r) = async_data_channel_new(format!("{}-{}", source_name, next_name));
            next.next.insert(next_name.to_string(), s);
            return MapResult::Ok(r);
        }else{
            let err_desc = format!("source [{}] add next [{}], source is None", source_name, next_name);
            return MapResult::Err(MapErr::new(err_desc));
        }
    }else{
        let err_desc = format!("source [{}] not exist", source_name);
        return MapResult::Err(MapErr::new(err_desc));
    }
}

pub async fn source_run()->MapResult<()>{
    info!("source_run");

    let mut source_ctl_list = source_list_write().await;

    for (source_name, source_ctl) in source_ctl_list.iter_mut(){
        if source_ctl.enable{
            if let Some(source) = source_ctl.source.take(){
                source_run_one(&source_ctl.runtime, source);

            }else{
                let err_desc = format!("source {} take is None", source_name);
                return MapResult::Err(MapErr::new(err_desc));
            }
        }else{
            info!("source [{}] not enable", source_name);
        }
    }

    return MapResult::Ok(());
}

fn source_run_one(runtime: &Runtime, source: SourceInstance){
    info!("start source [{}]", source.name);

    let mut source_next = Vec::new();

    for (n,_) in source.next.iter(){
        source_next.push(n);
    }
    info!("source {} next: [{:?}]", source.name, source_next);

    runtime.spawn(
        source_input(source)
    );
}

fn source_try_add_next(source: &mut SourceInstance){
    if let Some(msg) = source.ctl_recv.try_recv(){
        match msg{
            SourceMsg::AddNext(next_name, s) => {
                source.next.insert(next_name, s);
            },
        }
    }
}

/*
    每一个source都启动一个task
*/
async fn source_input(mut source: SourceInstance){

    debug!("source {} source_input", source.name);
    let normal_conf = normal_conf();

    loop{
        loop{
            source_try_add_next(&mut source);

            if source.source.init().await{
                debug!("source {} init ok", source.name);
                indicators_key_ref(Op::Add, IndicatorsMode::Source, &source.name, "init ok", 1);
                break;
            }else{
                debug!("source {} init fail", source.name);
                indicators_key_ref(Op::Add, IndicatorsMode::Source, &source.name, "init err", 1);
                std::thread::sleep(Duration::from_secs(1));
            }
        }

        loop{
            source_try_add_next(&mut source);

            match source.source.recv(&source.runtime, source.timeout.clone(), source.burst, source.concurrent).await{
                SourceResult::Ok(data_list) => {

                    if data_list.is_empty(){
                        tokio::time::sleep(Duration::from_millis(normal_conf.sleep_while_null as u64)).await;
                    }

                    indicators_key_ref(Op::Add, IndicatorsMode::Source, &source.name, "recv", data_list.len());

                    source_try_add_next(&mut source);

                    //send all data to all next
                    if let Some((first_name, first_next)) = source.next.pop_front(){

                        for (_next_name, next) in source.next.iter(){
                            let tmp_data = DataInstance::BatchData(data_list.clone());
                            next.send(tmp_data).await;
                        }

                        let tmp_data = DataInstance::BatchData(data_list);
                        first_next.send(tmp_data).await;

                        source.next.insert(first_name, first_next);
                    }
                },
                SourceResult::Err(_) => {
                    source_try_add_next(&mut source);

                    indicators_key_ref(Op::Add, IndicatorsMode::Source, &source.name, "recv err", 1);
                    break;
                },
            }
        }
    }
}

#[cfg(test)]
mod test_source_conf{
    use crate::conf_init::event_flow_node_from_conf;
    use crate::source::source_init_from_conf;

    #[test]
    pub fn test_source_node(){
        println!("test_source_node");
        if let Err(e) = log4rs::init_file("./conf/log4rs.yaml", Default::default()){
            println!("log4rs init_file failed, {}", e);
            std::process::exit(-1);
        }

        let node_conf = event_flow_node_from_conf("./conf/node.yaml");

        println!("test_source_node {:?}", node_conf);
    }
}